package org.luosl.webmagicx

import java.util
import java.util.concurrent.atomic.AtomicInteger

import org.luosl.webmagicx.Utils.Logging
import org.luosl.webmagicx.conf.SpiderConf
import org.quartz.{Trigger, _}
import org.quartz.impl.{JobDetailImpl, StdSchedulerFactory}
import us.codecraft.webmagic.Spider
import us.codecraft.webmagic.Spider.Status

import scala.collection.JavaConverters._

/**
  * Created by luosl on 2017/12/13.
  */
class TaskMonitor {

  private val schedulerFactory:StdSchedulerFactory = new StdSchedulerFactory()
  private val scheduler:Scheduler = schedulerFactory.getScheduler()
  private val incrementId:AtomicInteger = new AtomicInteger(1)
  private val registerSpiders:util.HashMap[String,(SpiderConf,Spider)] = new util.HashMap[String,(SpiderConf,Spider)]()

  scheduler.start()

  /**
    * 提交一个任务
    * @param spider spider
    * @param spiderConf spiderConf
    */
  def submitSpiderTask(spider: Spider,spiderConf: SpiderConf): Unit ={
    val job:JobDetail = buildJob(spider,spiderConf,s"spider${incrementId.getAndIncrement()}")
      if(spiderConf.task.isDefined){
        subCornJob(job,spiderConf.task.get.corn)
        if(spiderConf.task.get.startNow){
          val startNowJob:JobDetail =  buildJob(spider,spiderConf,s"spider${incrementId.getAndIncrement()}")
          subStartNowJob(startNowJob)
        }
      }else{
        subStartNowJob(job)
      }
    registerSpiders.put(spiderConf.id,(spiderConf, spider))
  }

  private def subStartNowJob(job:JobDetail): Unit ={
    val cornTrigger = TriggerBuilder.newTrigger()
      .withIdentity("startNow", "group1")
      .build()
    scheduler.scheduleJob(job,cornTrigger)
  }

  private def subCornJob(job:JobDetail,corn:String): Unit ={
    val cornTrigger = TriggerBuilder.newTrigger().withIdentity("corn", "group1")
      .withSchedule(CronScheduleBuilder.cronSchedule(corn))
      .build()
    scheduler.scheduleJob(job,cornTrigger)
  }

  private def buildJob(spider: Spider,spiderConf: SpiderConf, jobId:String):JobDetail = {
    val jobData:JobDataMap = new JobDataMap(Map("spider"->spider,"spiderConf"->spiderConf).asJava)
    JobBuilder.newJob(classOf[StartSpiderJob])
      .withIdentity(jobId, "group1")
      .setJobData(jobData)
      .build()
  }

  def shutdown(): Unit ={
    scheduler.shutdown()
  }

  def runTaskNumber():Int = {
    val onceNum:Int = registerSpiders.values().asScala
      .count(tu=> tu._1.task.isEmpty && (tu._2.getStatus == Status.Running || tu._2.getStatus == Status.Init))
    val cornNum:Int = registerSpiders.values().asScala.count(tu=> tu._1.task.isDefined)
    onceNum + cornNum
  }

}

class StartSpiderJob extends Job with Logging{
  override def execute(context: JobExecutionContext): Unit = {
    val spider:Spider = context.getJobDetail.getJobDataMap.get("spider").asInstanceOf[Spider]
    val sc:SpiderConf = context.getJobDetail.getJobDataMap.get("spiderConf").asInstanceOf[SpiderConf]
    if(spider.getStatus != Status.Running){
      sc.startUrls.foreach(url=>spider.addUrl(url))
      spider.start()
      logInfo(s"spider[path=${sc.path}]启动成功...")
    }else{
      logInfo(s"spider[path=${sc.path}]正在运行...")
    }
  }
}
